package com.example.bankseckill.rabbitmq;

import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
import com.example.bankseckill.mapper.UserMapper;
import com.example.bankseckill.pojo.*;
import com.example.bankseckill.pojo.vo.ProductVo;
import com.example.bankseckill.service.IOrderService;
import com.example.bankseckill.service.ISeckillOrderService;
import com.example.bankseckill.service.ISeckillProductService;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.HashOperations;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.stereotype.Component;


import java.io.IOException;
import java.util.Date;

/**
 * @Author: liyangjing
 * @Date: 2022/02/25/14:28
 * @Description:
 * 消息消费者
 */
@Component
@Slf4j
public class MQReceiver {

    public static long sumtime=0;

    public static long count=0;

    @Autowired
    private ISeckillProductService seckillProductService;

    @Autowired
    private RedisTemplate redisTemplate;

    @Autowired
    private IOrderService orderService;

    @Autowired
    private ISeckillOrderService iSeckillOrderService;

    @Autowired
    private UserMapper userMapper;
    /**
     * 下单操作
     * @param message
     */
    @RabbitListener(queues = SecKillMessageConstants.QUEUE_NAME)
    public void receive(Message message, Channel channel) throws IOException {
        long startTime = System.currentTimeMillis();
        //获取信息
        SeckillmessageLog seckillMessage = (SeckillmessageLog) message.getPayload();
        log.info("接受到的消息1"+seckillMessage);
        //消息头
        MessageHeaders headers = message.getHeaders();
        //消息序号
        long tag = (long) headers.get(AmqpHeaders.DELIVERY_TAG);
        HashOperations hashOperations = redisTemplate.opsForHash();
        try{
            if (hashOperations.entries("seckill_log").containsKey(seckillMessage.getMsgId())) {
                //redis中包含key，说明消息已经被消费
                log.info("消息已经被消费=====>{}",seckillMessage.getMsgId());
                /**
                 * 手动确认消息
                 * tag:消息序号
                 * multiple:是否多条
                 */
                channel.basicAck(tag, false);
                return;
            }
            //判断是否需要此消息
            Long seckillproductId = seckillMessage.getSecproductId();
            Long userId = seckillMessage.getUserId();
            //判断库存
            ProductVo productVo = seckillProductService.findProductVoById(seckillproductId);
            if(productVo.getStockCount()<1){
                return;
            }
            //判断是否重复抢购
            SeckillOrder seckillOrder = (SeckillOrder) redisTemplate.opsForValue().get("order:" + userId + ":" + productVo.getId());
            if(seckillOrder!=null){
                return;
            }
            //先在redis 缓存寻找用户
            User user = (User) redisTemplate.opsForValue().get("user:" + userId);
            if(user==null){
                user = userMapper.selectById(userId);
            }
            //下单操作
            Boolean result = orderService.seckill(user, productVo);
            //判断订单是否生成
            if(result){
                log.info("消息消费完成------》订单已经生成！");
                //将消息id存入redis
                hashOperations.put("seckill_log",seckillMessage.getMsgId(),"OK");
                //手动确认消息
                channel.basicAck(tag,false);
                long endTime = System.currentTimeMillis();
                log.info(log.getName()+"第"+(++count)+"次, "+" "+ (endTime-startTime));
                sumtime =sumtime+( endTime - startTime);
            }else {
                log.info("消息消费失败-------》订单生成失败！");
                /**
                 * 手动确认消息
                 * tag:消息序号
                 * multiple:是否多条
                 * requeue:是否回退到队列
                 */
                channel.basicNack(tag,false,true);
            }
        }catch (Exception e) {
            try {
                /**
                 * 手动确认消息
                 * tag:消息序号
                 * multiple:是否多条
                 * requeue:是否回退到队列
                 */
                channel.basicNack(tag,false,true);
            } catch (IOException ex) {
                log.error("消息确认失败=====>{}", ex.getMessage());
            }
            log.error("消息消费失败=====>{}", e.getMessage());
        }
    }

    @RabbitListener(queues = SecKillMessageConstants.DELAYED_QUEUE_NAME)
    public void receiveDelayedQueue(Message message,Channel channel) throws IOException {
        SeckillOrder seckillOrder =(SeckillOrder)message.getPayload();
        log.info("当前时间：{},收到延时队列的消息：{}", new Date().toString(),"秒杀订单的id为"+seckillOrder.getId());
        //查询数据库订单状态
        Order order = orderService.getById(seckillOrder.getOrderId());
        //若是未支付状态
        if(order.getStatus()==0){
            //修改订单状态
            order.setStatus(-1);
            orderService.updateById(order);
            //删除改用户的秒杀订单（MYsql）和（Redis）
            iSeckillOrderService.removeById(seckillOrder.getId());
            //redis
            redisTemplate.delete("order:"+seckillOrder.getUserId()+":"+seckillOrder.getSeckillProduct().getId());
            //返回库存redis
            redisTemplate.opsForValue().increment("seckillProducts:" + seckillOrder.getSeckillProduct().getId());
            //mysql返回库存
            seckillProductService.update( new UpdateWrapper<SeckillProduct>().setSql("stock_count = stock_count+1")
                        .eq("id", seckillOrder.getSeckillProduct().getId()));

        }
        //消息头
        MessageHeaders headers = message.getHeaders();
        //消息序号
        long tag = (long) headers.get(AmqpHeaders.DELIVERY_TAG);
        /**
         * 手动确认消息
         * tag:消息序号
         * multiple:是否多条
         */
        channel.basicAck(tag, false);
    }
}
